/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.inlong.sort.pulsar.internal;

import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkOutputMultiplexer;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.pulsar.internal.ClosableBlockingQueue;
import org.apache.flink.streaming.connectors.pulsar.internal.ExceptionProxy;
import org.apache.flink.streaming.connectors.pulsar.internal.PoisonState;
import org.apache.flink.streaming.connectors.pulsar.internal.PulsarCommitCallback;
import org.apache.flink.streaming.connectors.pulsar.internal.PulsarTopicPartitionStateWithWatermarkGenerator;
import org.apache.flink.streaming.connectors.pulsar.internal.PulsarTopicState;
import org.apache.flink.streaming.connectors.pulsar.internal.SourceContextWatermarkOutputAdapter;
import org.apache.flink.streaming.connectors.pulsar.internal.SourceSinkUtils;
import org.apache.flink.streaming.connectors.pulsar.internal.TopicRange;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.util.serialization.PulsarDeserializationSchema;
import org.apache.flink.util.SerializedValue;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.shade.com.google.common.collect.ImmutableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.lang.reflect.Field;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors;

import static org.apache.flink.streaming.connectors.pulsar.internal.metrics.PulsarSourceMetrics.COMMITTED_OFFSETS_METRICS_GAUGE;
import static org.apache.flink.streaming.connectors.pulsar.internal.metrics.PulsarSourceMetrics.CURRENT_OFFSETS_METRICS_GAUGE;
import static org.apache.flink.streaming.connectors.pulsar.internal.metrics.PulsarSourceMetrics.OFFSETS_BY_TOPIC_METRICS_GROUP;
import static org.apache.flink.util.Preconditions.checkNotNull;

/**
 * Copy from io.streamnative.connectors:pulsar-flink-connector_2.11:1.13.6.1-rc9,
 * From {@link org.apache.flink.streaming.connectors.pulsar.internal.PulsarFetcher}
 * Implements the logic around emitting records and tracking offsets,
 * as well as around the optional timestamp assignment and watermark generation.
 *
 * @param <T> The type of elements deserialized from Pulsar messages, and emitted into
 *           the Flink data stream.
 */
public class PulsarFetcher<T> {

    private static final Logger log = LoggerFactory.getLogger(PulsarFetcher.class);
    private static final int NO_TIMESTAMPS_WATERMARKS = 0;
    private static final int WITH_WATERMARK_GENERATOR = 1;

    // ------------------------------------------------------------------------

    /** The source context to emit records and watermarks to. */
    protected final SourceContext<T> sourceContext;

    protected final Map<TopicRange, MessageId> seedTopicsWithInitialOffsets;
    protected final Set<TopicRange> excludeStartMessageIds;

    /** The lock that guarantees that record emission and state updates are atomic,
     * from the view of taking a checkpoint. */
    private final Object checkpointLock;

    /** All partitions (and their state) that this fetcher is subscribed to. */
    protected final List<PulsarTopicState<T>> subscribedPartitionStates;

    /**
     * Queue of partitions that are not yet assigned to any reader thread for consuming.
     *
     * <p>All partitions added to this queue are guaranteed to have been added
     * to {@link #subscribedPartitionStates} already.
     */
    protected final ClosableBlockingQueue<PulsarTopicState<T>> unassignedPartitionsQueue;

    /** The mode describing whether the fetcher also generates timestamps and watermarks. */
    private final int timestampWatermarkMode;

    /**
     * Optional watermark strategy that will be run per pulsar partition, to exploit per-partition
     * timestamp characteristics. The watermark strategy is kept in serialized form, to deserialize
     * it into multiple copies.
     */
    private final SerializedValue<WatermarkStrategy<T>> watermarkStrategy;

    /** User class loader used to deserialize watermark assigners. */
    private final ClassLoader userCodeClassLoader;

    private final StreamingRuntimeContext runtimeContext;

    protected final ClientConfigurationData clientConf;

    protected final Map<String, Object> readerConf;

    protected final String subscriptionName;

    protected final PulsarDeserializationSchema<T> deserializer;

    protected final int pollTimeoutMs;

    private final int commitMaxRetries;

    protected final PulsarMetadataReader metadataReader;

    /**
     * Wrapper around our SourceContext for allowing the
     * {@link org.apache.flink.api.common.eventtime.WatermarkGenerator}
     * to emit watermarks and mark idleness.
     */
    protected final WatermarkOutput watermarkOutput;

    /**
     * {@link WatermarkOutputMultiplexer} for supporting per-partition watermark generation.
     */
    private final WatermarkOutputMultiplexer watermarkOutputMultiplexer;

    /** Flag to mark the main work loop as alive. */
    private volatile boolean running = true;

    /** The threads that runs the actual reading and hand the records to this fetcher. */
    private Map<TopicRange, ReaderThread<T>> topicToThread;

    /** Failed or not when data loss. **/
    private boolean failOnDataLoss = true;

    private boolean useEarliestWhenDataLoss;

    /** topic poison state */
    private PoisonState poisonState;

    // ------------------------------------------------------------------------
    // Metrics
    // ------------------------------------------------------------------------

    /**
     * Flag indicating whether or not metrics should be exposed.
     * If {@code true}, offset metrics (e.g. current offset, committed offset) and
     * pulsar-shipped metrics will be registered.
     */
    private final boolean useMetrics;

    /**
     * The metric group which all metrics for the source should be registered to.
     */
    private final MetricGroup consumerMetricGroup;

    public PulsarFetcher(
            SourceContext<T> sourceContext,
            Map<TopicRange, MessageId> seedTopicsWithInitialOffsets,
            SerializedValue<WatermarkStrategy<T>> watermarkStrategy,
            ProcessingTimeService processingTimeProvider,
            long autoWatermarkInterval,
            ClassLoader userCodeClassLoader,
            StreamingRuntimeContext runtimeContext,
            ClientConfigurationData clientConf,
            Map<String, Object> readerConf,
            int pollTimeoutMs,
            PulsarDeserializationSchema<T> deserializer,
            PulsarMetadataReader metadataReader,
            MetricGroup consumerMetricGroup,
            boolean useMetrics,
            String subscriptionName) throws Exception {
        this(
                sourceContext,
                seedTopicsWithInitialOffsets,
                Collections.emptySet(),
                watermarkStrategy,
                processingTimeProvider,
                autoWatermarkInterval,
                userCodeClassLoader,
                runtimeContext,
                clientConf,
                readerConf,
                subscriptionName,
                pollTimeoutMs,
                3, // commit retries before fail
                deserializer,
                metadataReader,
                consumerMetricGroup,
                useMetrics);
    }

    public PulsarFetcher(
            SourceContext<T> sourceContext,
            Map<TopicRange, MessageId> seedTopicsWithInitialOffsets,
            Set<TopicRange> excludeStartMessageIds,
            SerializedValue<WatermarkStrategy<T>> watermarkStrategy,
            ProcessingTimeService processingTimeProvider,
            long autoWatermarkInterval,
            ClassLoader userCodeClassLoader,
            StreamingRuntimeContext runtimeContext,
            ClientConfigurationData clientConf,
            Map<String, Object> readerConf,
            String subscriptionName,
            int pollTimeoutMs,
            int commitMaxRetries,
            PulsarDeserializationSchema<T> deserializer,
            PulsarMetadataReader metadataReader,
            MetricGroup consumerMetricGroup,
            boolean useMetrics) throws Exception {

        this.sourceContext = sourceContext;
        this.watermarkOutput = new SourceContextWatermarkOutputAdapter<>(sourceContext);
        this.subscriptionName = subscriptionName;
        this.watermarkOutputMultiplexer = new WatermarkOutputMultiplexer(watermarkOutput);
        this.useMetrics = useMetrics;
        this.consumerMetricGroup = checkNotNull(consumerMetricGroup);
        this.seedTopicsWithInitialOffsets = seedTopicsWithInitialOffsets;
        this.excludeStartMessageIds = excludeStartMessageIds;
        this.checkpointLock = sourceContext.getCheckpointLock();
        this.userCodeClassLoader = userCodeClassLoader;
        this.runtimeContext = runtimeContext;
        this.clientConf = clientConf;
        this.readerConf = readerConf == null ? new HashMap<>() : readerConf;
        this.failOnDataLoss = SourceSinkUtils.getFailOnDataLossAndRemoveKey(this.readerConf);
        this.useEarliestWhenDataLoss = SourceSinkUtils.getUseEarliestWhenDataLossAndRemoveKey(this.readerConf);
        this.pollTimeoutMs = pollTimeoutMs;
        this.commitMaxRetries = commitMaxRetries;
        this.deserializer = deserializer;
        this.metadataReader = metadataReader;

        // figure out what we watermark mode we will be using
        this.watermarkStrategy = watermarkStrategy;

        if (watermarkStrategy == null) {
            timestampWatermarkMode = NO_TIMESTAMPS_WATERMARKS;
        } else {
            timestampWatermarkMode = WITH_WATERMARK_GENERATOR;
        }

        this.unassignedPartitionsQueue = new ClosableBlockingQueue<>();

        // initialize subscribed partition states with seed partitions
        this.subscribedPartitionStates = createPartitionStateHolders(
                seedTopicsWithInitialOffsets,
                timestampWatermarkMode,
                watermarkStrategy,
                userCodeClassLoader);

        // check that all seed partition states have a defined offset
        for (PulsarTopicState<T> state : subscribedPartitionStates) {
            if (!state.isOffsetDefined()) {
                throw new IllegalArgumentException(
                        "The fetcher was assigned seed partitions with undefined initial offsets.");
            }
        }

        // all seed partitions are not assigned yet, so should be added to the unassigned partitions queue
        for (PulsarTopicState<T> state : subscribedPartitionStates) {
            unassignedPartitionsQueue.add(state);
        }

        // register metrics for the initial seed partitions
        if (useMetrics) {
            registerOffsetMetrics(consumerMetricGroup, subscribedPartitionStates);
        }

        // if we have periodic watermarks, kick off the interval scheduler
        if (timestampWatermarkMode == WITH_WATERMARK_GENERATOR && autoWatermarkInterval > 0) {
            PeriodicWatermarkEmitter<T> periodicEmitter = new PeriodicWatermarkEmitter<>(
                    checkpointLock,
                    subscribedPartitionStates,
                    watermarkOutputMultiplexer,
                    processingTimeProvider,
                    autoWatermarkInterval);

            periodicEmitter.start();
        }

        // get position state
        Field poisonInstance = PoisonState.class.getDeclaredField("INSTANCE");
        poisonInstance.setAccessible(true);
        poisonState = (PoisonState) poisonInstance.get(PoisonState.class);
    }

    public void runFetchLoop() throws Exception {
        topicToThread = new HashMap<>();
        ExceptionProxy exceptionProxy = new ExceptionProxy(Thread.currentThread());

        try {

            while (running) {
                // re-throw any exception from the concurrent fetcher threads
                exceptionProxy.checkAndThrowException();

                // wait for max 5 seconds trying to get partitions to assign
                // if threads shut down, this poll returns earlier, because the threads inject the
                // special marker into the queue
                List<PulsarTopicState<T>> topicsToAssign = unassignedPartitionsQueue.getBatchBlocking(5000);
                // if there are more markers, remove them all
                topicsToAssign.removeIf(s -> s.equals(poisonState));

                if (!topicsToAssign.isEmpty()) {

                    if (!running) {
                        throw BreakingException.INSTANCE;
                    }

                    topicToThread.putAll(
                            createAndStartReaderThread(topicsToAssign, exceptionProxy));

                } else {
                    // there were no partitions to assign. Check if any consumer threads shut down.
                    // we get into this section of the code, if either the poll timed out, or the
                    // blocking poll was woken up by the marker element

                    topicToThread.values().removeIf(t -> !t.isRunning());

                }

                if (topicToThread.size() == 0 && unassignedPartitionsQueue.isEmpty()) {
                    PulsarTopicState topicForBlocking = unassignedPartitionsQueue.getElementBlocking();
                    if (topicForBlocking.equals(poisonState)) {
                        throw BreakingException.INSTANCE;
                    }
                    topicToThread.putAll(
                            createAndStartReaderThread(ImmutableList.of(topicForBlocking), exceptionProxy));
                }
            }

        } catch (BreakingException b) {
            // do nothing
        } catch (InterruptedException e) {
            // this may be thrown because an exception on one of the concurrent fetcher threads
            // woke this thread up. make sure we throw the root exception instead in that case
            exceptionProxy.checkAndThrowException();

            // no other root exception, throw the interrupted exception
            throw e;
        } finally {
            running = false;

            // clear the interruption flag
            // this allows the joining on reader threads (on best effort) to happen in
            // case the initial interrupt already
            Thread.interrupted();

            // make sure that in any case (completion, abort, error), all spawned threads are stopped
            try {
                int runningThreads = 0;
                do { // check whether threads are alive and cancel them
                    runningThreads = 0;

                    topicToThread.values().removeIf(s -> !s.isAlive());

                    for (ReaderThread t : topicToThread.values()) {
                        t.cancel();
                        runningThreads++;
                    }

                    if (runningThreads > 0) {
                        for (ReaderThread t : topicToThread.values()) {
                            t.join(500 / runningThreads + 1);
                        }

                    }

                } while (runningThreads > 0);

            } catch (InterruptedException ignored) {
                // waiting for the thread shutdown apparently got interrupted
                // restore interrupted state and continue
                Thread.currentThread().interrupt();
            } catch (Throwable t) {
                // we catch all here to preserve the original exception
                log.error("Exception while shutting down reader threads", t);
            }
        }
    }

    // ------------------------------------------------------------------------
    // emitting records
    // ------------------------------------------------------------------------

    /**
     * Emits a record attaching a timestamp to it.
     * @param records The records to emit
     * @param partitionState The state of the pulsar partition from which the record was fetched
     * @param offset The offset of the corresponding pulsar record
     * @param pulsarEventTimestamp The timestamp of the pulsar record
     */
    protected void emitRecordsWithTimestamps(
            Queue<T> records,
            PulsarTopicState<T> partitionState,
            MessageId offset,
            long pulsarEventTimestamp) {
        // emit the records, using the checkpoint lock to guarantee
        // atomicity of record emission and offset state update
        synchronized (checkpointLock) {
            T record;
            while ((record = records.poll()) != null) {
                long timestamp = partitionState.extractTimestamp(record, pulsarEventTimestamp);
                sourceContext.collectWithTimestamp(record, timestamp);

                // this might emit a watermark, so do it after emitting the record
                partitionState.onEvent(record, timestamp);
            }
            partitionState.setOffset(offset);
        }
    }

    public void cancel() throws Exception {
        // single the main thread to exit
        running = false;

        // make sure the main thread wakes up soon
        unassignedPartitionsQueue.addIfOpen(poisonState);
    }

    public void commitOffsetToState(Map<TopicRange, MessageId> offset) {
        for (PulsarTopicState state : subscribedPartitionStates) {
            MessageId off = offset.get(state.getTopicRange());
            if (off != null) {
                state.setCommittedOffset(off);
            }
        }
    }

    public void commitOffsetToPulsar(
            Map<TopicRange, MessageId> offset,
            PulsarCommitCallback offsetCommitCallback) {

        doCommitOffsetToPulsar(removeEarliestAndLatest(offset), offsetCommitCallback);
    }

    public Map<TopicRange, MessageId> removeEarliestAndLatest(Map<TopicRange, MessageId> offset) {
        Map<TopicRange, MessageId> result = new HashMap<>();
        for (Map.Entry<TopicRange, MessageId> entry : offset.entrySet()) {
            MessageId mid = entry.getValue();
            if (!mid.equals(MessageId.earliest) && !mid.equals(MessageId.latest)) {
                result.put(entry.getKey(), mid);
            }
        }
        return result;
    }

    public void addDiscoveredTopics(Set<TopicRange> newTopics) throws IOException, ClassNotFoundException {
        List<PulsarTopicState<T>> newStates = createPartitionStateHolders(
                newTopics.stream().collect(Collectors.toMap(t -> t, t -> MessageId.earliest)),
                timestampWatermarkMode,
                watermarkStrategy,
                userCodeClassLoader);

        for (PulsarTopicState state : newStates) {
            // the ordering is crucial here; first register the state holder, then
            // push it to the partitions queue to be read
            subscribedPartitionStates.add(state);
            unassignedPartitionsQueue.add(state);
        }
    }

    // ------------------------------------------------------------------------
    // snapshot and restore the state
    // ------------------------------------------------------------------------

    /**
     * Takes a snapshot of the partition offsets.
     *
     * <p>Important: This method must be called under the checkpoint lock.
     *
     * @return A map from partition to current offset.
     */
    public Map<TopicRange, MessageId> snapshotCurrentState() {
        // this method assumes that the checkpoint lock is held
        assert Thread.holdsLock(checkpointLock);

        Map<TopicRange, MessageId> state = new HashMap<>(subscribedPartitionStates.size());

        for (PulsarTopicState pa : subscribedPartitionStates) {
            state.put(pa.getTopicRange(), pa.getOffset());
        }
        return state;
    }

    public Map<TopicRange, ReaderThread<T>> createAndStartReaderThread(
            List<PulsarTopicState<T>> states,
            ExceptionProxy exceptionProxy) {
        Map<TopicRange, ReaderThread<T>> topic2Threads = new HashMap<>();

        for (PulsarTopicState state : states) {
            ReaderThread<T> readerT = createReaderThread(exceptionProxy, state);
            readerT.setName(String.format(
                    "Pulsar Reader for %s in task %s",
                    state.getTopicRange(),
                    runtimeContext.getTaskName()));
            readerT.setDaemon(true);
            readerT.start();
            log.info("Starting Thread {}", readerT.getName());
            topic2Threads.put(state.getTopicRange(), readerT);
        }
        return topic2Threads;
    }

    protected List<PulsarTopicState<T>> getSubscribedTopicStates() {
        return subscribedPartitionStates;
    }

    protected ReaderThread<T> createReaderThread(ExceptionProxy exceptionProxy, PulsarTopicState state) {
        return new ReaderThread<>(
                this,
                state,
                clientConf,
                readerConf,
                deserializer,
                pollTimeoutMs,
                exceptionProxy,
                failOnDataLoss,
                useEarliestWhenDataLoss,
                excludeStartMessageIds.contains(state.getTopicRange()), subscriptionName);
    }

    /**
     * Utility method that takes the topic partitions and creates the topic partition state
     * holders, depending on the timestamp / watermark mode.
     */
    private List<PulsarTopicState<T>> createPartitionStateHolders(
            Map<TopicRange, MessageId> partitionsToInitialOffsets,
            int timestampWatermarkMode,
            SerializedValue<WatermarkStrategy<T>> watermarkStrategy,
            ClassLoader userCodeClassLoader) throws IOException, ClassNotFoundException {

        // CopyOnWrite as adding discovered partitions could happen in parallel
        // while different threads iterate the partitions list
        List<PulsarTopicState<T>> partitionStates = new CopyOnWriteArrayList<>();

        switch (timestampWatermarkMode) {
            case NO_TIMESTAMPS_WATERMARKS: {
                for (Map.Entry<TopicRange, MessageId> partitionEntry : partitionsToInitialOffsets.entrySet()) {
                    PulsarTopicState<T> state = new PulsarTopicState(partitionEntry.getKey());
                    state.setOffset(partitionEntry.getValue());
                    partitionStates.add(state);
                }
                return partitionStates;
            }

            case WITH_WATERMARK_GENERATOR: {
                for (Map.Entry<TopicRange, MessageId> partitionEntry : partitionsToInitialOffsets.entrySet()) {
                    final TopicRange topicRange = partitionEntry.getKey();

                    PulsarTopicState<T> state = new PulsarTopicState(partitionEntry.getKey());
                    WatermarkStrategy<T> deserializedWatermarkStrategy = watermarkStrategy.deserializeValue(
                            userCodeClassLoader);

                    // the format of the ID does not matter, as long as it is unique
                    final String partitionId = state.getTopicRange().toString();
                    watermarkOutputMultiplexer.registerNewOutput(partitionId);
                    WatermarkOutput immediateOutput =
                            watermarkOutputMultiplexer.getImmediateOutput(partitionId);
                    WatermarkOutput deferredOutput =
                            watermarkOutputMultiplexer.getDeferredOutput(partitionId);

                    PulsarTopicPartitionStateWithWatermarkGenerator<T> partitionState =
                            new PulsarTopicPartitionStateWithWatermarkGenerator<>(
                                    topicRange,
                                    state,
                                    deserializedWatermarkStrategy.createTimestampAssigner(() -> consumerMetricGroup),
                                    deserializedWatermarkStrategy.createWatermarkGenerator(() -> consumerMetricGroup),
                                    immediateOutput,
                                    deferredOutput);

                    partitionState.setOffset(partitionEntry.getValue());

                    partitionStates.add(partitionState);

                }

                return partitionStates;
            }

            default:
                // cannot happen, add this as a guard for the future
                throw new RuntimeException();
        }
    }

    // ------------------------- Metrics ----------------------------------

    /**
     * For each partition, register a new metric group to expose current offsets and committed offsets.
     * Per-partition metric groups can be scoped by user variables.
     *
     * <p>Note: this method also registers gauges for deprecated offset metrics, to maintain backwards compatibility.
     *
     * @param consumerMetricGroup The consumer metric group
     * @param partitionOffsetStates The partition offset state holders, whose values will be used to update metrics
     */
    private void registerOffsetMetrics(
            MetricGroup consumerMetricGroup,
            List<PulsarTopicState<T>> partitionOffsetStates) {

        for (PulsarTopicState<T> pts : partitionOffsetStates) {
            MetricGroup topicPartitionGroup = consumerMetricGroup
                    .addGroup(OFFSETS_BY_TOPIC_METRICS_GROUP, pts.getTopicRange().getTopic());

            topicPartitionGroup.gauge(
                    CURRENT_OFFSETS_METRICS_GAUGE, new OffsetGauge(pts, OffsetGaugeType.CURRENT_OFFSET));
            topicPartitionGroup.gauge(
                    COMMITTED_OFFSETS_METRICS_GAUGE, new OffsetGauge(pts, OffsetGaugeType.COMMITTED_OFFSET));
        }
    }

    /**
     * Gauge types.
     */
    private enum OffsetGaugeType {
        CURRENT_OFFSET,
        COMMITTED_OFFSET
    }

    /**
     * Gauge for getting the offset of a PulsarTopicState.
     */
    private static class OffsetGauge implements Gauge<MessageId> {

        private final PulsarTopicState<?> pts;
        private final OffsetGaugeType gaugeType;

        OffsetGauge(PulsarTopicState<?> pts, OffsetGaugeType gaugeType) {
            this.pts = pts;
            this.gaugeType = gaugeType;
        }

        @Override
        public MessageId getValue() {
            switch (gaugeType) {
                case COMMITTED_OFFSET:
                    return pts.getCommittedOffset();
                case CURRENT_OFFSET:
                    return pts.getOffset();
                default:
                    throw new RuntimeException("Unknown gauge type: " + gaugeType);
            }
        }
    }
    // ------------------------------------------------------------------------

    /**
     * The periodic watermark emitter. In its given interval, it checks all partitions for
     * the current event time watermark, and possibly emits the next watermark.
     */
    private static class PeriodicWatermarkEmitter<T> implements ProcessingTimeCallback {

        private final Object checkpointLock;

        private final List<PulsarTopicState<T>> allPartitions;

        private final WatermarkOutputMultiplexer watermarkOutputMultiplexer;

        private final ProcessingTimeService timerService;

        private final long interval;

        // -------------------------------------------------

        PeriodicWatermarkEmitter(
                Object checkpointLock,
                List<PulsarTopicState<T>> allPartitions,
                WatermarkOutputMultiplexer watermarkOutputMultiplexer,
                ProcessingTimeService timerService,
                long autoWatermarkInterval) {
            this.checkpointLock = checkpointLock;
            this.allPartitions = checkNotNull(allPartitions);
            this.watermarkOutputMultiplexer = watermarkOutputMultiplexer;
            this.timerService = checkNotNull(timerService);
            this.interval = autoWatermarkInterval;
        }

        // -------------------------------------------------

        public void start() {
            timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this);
        }

        @Override
        public void onProcessingTime(long timestamp) throws Exception {
            synchronized (checkpointLock) {
                for (PulsarTopicState<?> state : allPartitions) {
                    state.onPeriodicEmit();
                }

                watermarkOutputMultiplexer.onPeriodicEmit();
            }

            // schedule the next watermark
            timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this);
        }
    }

    private static class BreakingException extends Exception {

        static final BreakingException INSTANCE = new BreakingException();

        private BreakingException() {
        }
    }

    protected void doCommitOffsetToPulsar(
            Map<TopicRange, MessageId> offset,
            PulsarCommitCallback offsetCommitCallback) {

        try {
            int retries = 0;
            boolean success = false;
            while (running) {
                try {
                    metadataReader.commitOffsetToCursor(offset);
                    success = true;
                    break;
                } catch (Exception e) {
                    log.warn("Failed to commit cursor to Pulsar.", e);
                    if (retries >= commitMaxRetries) {
                        log.error("Failed to commit cursor to Pulsar after {} attempts", retries);
                        throw e;
                    }
                    retries += 1;
                    Thread.sleep(1000);
                }
            }
            if (success) {
                offsetCommitCallback.onSuccess();
            } else {
                return;
            }
        } catch (Exception e) {
            if (running) {
                offsetCommitCallback.onException(e);
            } else {
                return;
            }
        }

        for (PulsarTopicState state : subscribedPartitionStates) {
            MessageId off = offset.get(state.getTopicRange());
            if (off != null) {
                state.setCommittedOffset(off);
            }
        }
    }

    public PulsarMetadataReader getMetaDataReader() {
        return this.metadataReader;
    }
}
